package com.siyoumi.app.sys.service.netty;

import com.siyoumi.app.entity.IotDevice;
import com.siyoumi.app.modules.iot.entity.EnumIotDeviceLogType;
import com.siyoumi.app.modules.iot.entity.IotPinCode;
import com.siyoumi.app.modules.iot.service.DeviceActionHandle;
import com.siyoumi.app.modules.iot.service.SvcIotDevice;
import com.siyoumi.app.modules.iot.service.device_action.DeviceActionHandleState;
import com.siyoumi.app.netty.NettyMqttUtil;
import com.siyoumi.app.netty.NettyUtil;
import com.siyoumi.component.XRedis;
import com.siyoumi.util.LogMdc;
import com.siyoumi.util.XStr;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MqttTestHandler
        extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);

        LogMdc.setRequestId("0");
        log.debug("{}----链接创建：{}", ctx.channel().id(), ctx.channel().remoteAddress());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogMdc.setRequestId();
        if (null == msg) {
            super.channelRead(ctx, msg);
            return;
        }

        MqttMessage mqttMessage = (MqttMessage) msg;
        //log.info("message: {}", mqttMessage.toString());
        MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
        log.info("messageType: {}", mqttFixedHeader.messageType());


        Channel channel = ctx.channel();

        if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) {
            //	在一个网络连接上，客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
            //	to do 建议connect消息单独处理，用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
            NettyMqttUtil.connack(ctx, mqttMessage);
        }

        String queueId = NettyUtil.getOpenid(channel);
        log.info("queue_id: {}", queueId);
        switch (mqttFixedHeader.messageType()) {
            case PUBLISH:        //	客户端发布消息
                MqttPublishVariableHeader mqttPublishVariableHeader = (MqttPublishVariableHeader) mqttMessage.variableHeader();
                String text = NettyMqttUtil.toMessage(mqttMessage);
                log.info("msg: {}", text);
                log.info("topic: {}", mqttPublishVariableHeader.topicName());
                switch (mqttPublishVariableHeader.topicName()) {
                    case "siyoumi":
                        //设备反馈
                        IotPinCode iotPinCode = IotPinCode.parse(text);
                        if (XStr.hasAnyText(iotPinCode.getActionId())) {
                            XRedis.getBean().setEx(DeviceActionHandle.callbackKey(iotPinCode.getActionId()), text, 300);
                        }
                        if ("device_state".equals(iotPinCode.getAction())) {
                            // pass
                        } else {
                            SvcIotDevice.getBean().addLog(queueId, "pull", text);
                        }
                        break;

                    case "siyoumi/push":
                        //提交数据
                        SvcIotDevice.getBean().addLog(queueId, "push", text);
                        break;
                }

                //	PUBACK报文是对QoS 1等级的PUBLISH报文的响应
                NettyMqttUtil.puback(ctx, mqttMessage);
                break;
            case PUBREL:        //	发布释放
                //	PUBREL报文是对PUBREC报文的响应
                //	to do
                NettyMqttUtil.pubcomp(ctx, mqttMessage);
                break;
            case SUBSCRIBE:        //	客户端订阅主题
                //	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅，每个订阅注册客户端关心的一个或多个主题。
                //	为了将应用消息转发给与那些订阅匹配的主题，服务端发送PUBLISH报文给客户端。
                //	SUBSCRIBE报文也（为每个订阅）指定了最大的QoS等级，服务端根据这个发送应用消息给客户端
                // 	to do
                NettyMqttUtil.suback(ctx, mqttMessage);
                break;
            case UNSUBSCRIBE:    //	客户端取消订阅
                //	客户端发送UNSUBSCRIBE报文给服务端，用于取消订阅主题
                //	to do
                NettyMqttUtil.unsuback(ctx, mqttMessage);
                break;
            case PINGREQ:        //	客户端发起心跳
                //	客户端发送PINGREQ报文给服务端的
                //	在没有任何其它控制报文从客户端发给服务的时，告知服务端客户端还活着
                //	请求服务端发送 响应确认它还活着，使用网络以确认网络连接没有断开
                NettyMqttUtil.pingresp(ctx, mqttMessage);

                IotDevice entityDevice = SvcIotDevice.getBean().getEntity(queueId);
                if (entityDevice != null) {
                    //获取设备状态
                    DeviceActionHandleState handleState = new DeviceActionHandleState();
                    handleState.send(entityDevice);
                }
                break;
            case DISCONNECT:    //	客户端主动断开连接
                //	DISCONNECT报文是客户端发给服务端的最后一个控制报文， 服务端必须验证所有的保留位都被设置为0
                //	to do
                log.debug("{}----链接断开：{}", queueId, ctx.channel().remoteAddress());

                NettyMqttUtil.getMapChannel().remove(queueId);
                break;
            default:
                break;
        }
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LogMdc.setRequestId("0");
        super.channelInactive(ctx);

        String queueId = NettyUtil.getOpenid(ctx.channel());
        log.debug("{}----链接断开：{}", queueId, ctx.channel().remoteAddress());


        NettyMqttUtil.getMapChannel().remove(queueId);
    }
}
